查看原文
其他

RisingWave 1.7 发布!新增自适应并行度、支持加密函数等多个功能特性

RisingWave 社区 RisingWave中文开源社区
2024-09-10
我们非常高兴地宣布:RisingWave 1.7 版本正式发布!新版本包含了许多重要更新,如增强的 UDF、新 SQL 函数、新查询句法等。现在让我们快速了解本次更新的主要亮点

1自适应并行度

1.7 版开始,所有新增的流处理作业将默认启用自适应并行度。也就是说,如果您向集群添加了额外的节点或 CPU,RisingWave 将会自动调整并行度以充分利用新增的资源,为用户提供更为顺畅的扩展体验。如果您尚未准备好使用这一功能,可以将 disable_automatic_parallelism_control 设置为 true,在整个会话中禁用此选项,以最大程度地确保系统的稳定性。

如有需要,可以为 Table、物化视图或 Sink 设置固定的并行度,以便控制流带宽,并方便预测资源分配。

ALTER TABLE table_name SET PARALLELISM = 8;

更多细节,请查看:

  • Cluster scaling[1] 「集群扩展」

2从外部 Source 读取所有列与 Schema

在 RisingWave 中,创建 Table 或 Source 以从外部读取数据时,必须定义 Schema,即指定每个列的列名和数据类型,除非数据采用 Protobuf 格式。

若存在大量列,逐一定义每列可能会很繁琐。此次更新后,您可使用 * 表示应在 Table 或 Source 中包含来自 Schema Registry 的所有列。额外生成的列也可以包括在内。请记住,此方法仅在定义了外部 Schema Registry 时才有效。

CREATE TABLE from_kafka (
        *,
        gen_i32_field int AS int32_field + 2)
WITH (
  connector = 'kafka',
  topic = 'test',
  properties.bootstrap.server = 'message_queue:29092')
FORMAT UPSERT ENCODE AVRO (
  schema.registry = 'http://message_queue:8081');

更多细节,请查看:

  • CREATE TABLE[2] 「CREATE TABLE 命令」
  • CREATE SOURCE[3]「CREATE SOURCE 命令」

3INCLUDE 子句

现在支持在 CREATE SOURCE 和 CREATE TABLE 使用 INCLUDE 子句,用于添加不属于有效负载部分的消息组件作为附加列。例如,INCLUDE 子句可以添加消息键(Message Key)、时间戳或主题(Topic Header)这样的列,以进行下一步分析。比如,您可以在创建 Table 或 Source 时添加时间戳列:

CREATE SOURCE s_name(
    ...
INCLUDE timestamp AS include_ts
WITH (...)
FORMAT ... ENCODE ...;

使用 AS 指定别名是可选的,但如果没有指定,列名则将根据所使用的连接器类型自动生成。因此,如果上述 Source 连接的是 Kafka 消息队列,生成的列名将是 _rw_kafka_timestamp

在从数据流中摄取数据时,还有以下选项可选,包括键、分区、偏移、时间戳或 Header。

INCLUDE {key | partition | offest | timestamp | header} [AS col_name]

此外,对于 UPSERT 类型的 Source 和 Table,必须要用 INCLUDE KEY,因为 RisingWave 将使用此列执行 UPSERT 语义。在这种情况下,不能将主键定义为多个列。

更多细节,请查看:

  • INCLUDE clause[4] 「 INCLUDE 子句」

4增强 UDF 支持

1.7 版本增强了对用户自定义函数(UDF)的支持。UDF 让您能在数据转换和计算方面更灵活。您可以选择使用编程语言定义外部 UDF,或在 RisingWave 内使用 SQL 定义 UDF。一旦创建了 UDF,就可以像使用任何其他内置函数一样在 SQL 查询中使用它们。此次更新,我们新增用 JavaScript 和 Rust 定义的外部 UDF,同时新增 SQL UDF。

JavaScript UDF

您可以通过使用 CREATE FUNCTION 命令在 RisingWave 中创建 JavaScript UDF。与其他语言定义的 UDF 相比,创建 JavaScript UDF 最为简单,因为 RisingWave 中内置了 QuickJS 虚拟机。您不需要在本地机器上进行额外设置,也不需要在单独的文件中定义函数。JavaScript UDF 仅限于计算任务,如果您的数据需要简单但重复的转换,那么它们是完美的选择。

CREATE FUNCTION gcd(a int, b intRETURNS int LANGUAGE javascript AS $$
    if(a == null || b == null) {
        return null;
    }
    while (b != 0) {
        let t = b;
        b = a % b;
        a = t;
    }
    return a;
$$;

Rust UDF

您也可以使用 Rust 在 RisingWave 中创建 UDF。您可以选择将 WebAssembly (WASM) 二进制文件以 base64 编码嵌入到 SQL 中,或从对象存储加载 WASM 二进制文件。与使用 Python 和 Java 定义的 UDF 相比,用 Rust 定义的 UDF 在性能方面有更好的表现,但仅限于计算任务。与使用 JavaScript 创建的 UDF 不同,Rust 需要额外的设置。具体请参见下方链接。

SQL UDF

您还可以用 SQL 在 RisingWave 内部创建 UDF。通过这种方式,您可以封装常用逻辑,并将复杂的操作和计算抽象为可重用的函数,提高查询的可读性。此外,创建 UDF 时,您可以调用内置的 SQL 函数和预定义的 SQL UDF,但目前不支持递归定义。

以下是一个简单的 SQL UDF 示例,用于将两个参数相加。对于输入参数,在定义时,您可以选择是否命名。以下例子中的参数是未命名的。

CREATE FUNCTION add(INTINT)
    RETURNS int
    LANGUAGE SQL
    AS $$select $1 + $2$$;

SELECT add(1-1);
----返回结果
0

更多细节,请查看:

  • Use UDFs in JavaScript[5] 「用 JavaScript 使用 UDF」
  • Use UDFs in Rust[6] 「用 Rust 使用 UDF」
  • SQL UDFs[7]

5支持加密函数

加密函数是将输入内容转换为不可读格式的算法,用来保护数据安全。对于加密后的数据,可以通过特定的密钥或算法解密,将其还原回其原始形式。您现在可以使用原始加密函数 encrypt 和 decrypt 来加解密数据。需要注意的是,这两个函数只是根据给定的算法对输入内容加密,不提供额外的安全措施。以下是它们的句法:

encrypt(data bytea, key bytea, type text) -> bytea
decrypt(data bytea, key bytea, type text) -> bytea

加密或解密的具体算法由 type 指定,type 由 algorithmmode 和 padding 构成。其具体设置可参见下方链接。

 algorithm [-mode] [/pad:padding]

以下是一个示例,用于加密”Hello, World!”,其使用 aes 算法、cbc 模式,并以 pkcs 方式填充。

SELECT encrypt('Hello, World!''my_secret_key''aes-cbc/pad:pkcs');
----返回结果
\\330\\317\\204\\357\\327\\367\\206\\241\\253\\024\\303\\013\\215\\030\\231\\257
(1 row)

更多细节,请查看:

  • Cryptographic functions[8] 「加密函数」

6CDC 连接器的增强功能

本次更新为 MySQL 和 PostgreSQL CDC 连接器引入了一些增强功能。

首先,创建共享的 MySQL 和 PostgreSQL Source 时,默认的 transactional 参数值已更改为 true。如果创建 CDC Table,默认值则为 false。此参数允许您对 CDC Table 或 Source 启用或禁用事务。

现在,在创建 MySQL 和 PostgreSQL CDC Table 时,也可使用 snapshot 参数。当此参数设置为 false 时,RisingWave 中的 CDC Table 将仅消费在 Table 创建后发生的上游事件。上游 MySQL 或 PostgreSQL Table 中的任何现有数据将不会出现在 RisingWave 的 Table 中。

CREATE TABLE orders_no_backfill (
   order_id int,
   order_date date,
   customer_name string,
   PRIMARY KEY (order_id)
WITH (
   snapshot = 'false'
FROM pg_source TABLE 'public.orders_tx';

更多细节,请查看:

  • Ingest data from MySQL CDC[9]「从 MySQL CDC 导入数据」
  • Ingest data from PostgreSQL CDC[10]「从 PostgreSQL CDC 导入数据」

7总结

以上只是 RisingWave 1.7 版本新增的部分功能,如果您想了解本次更新的完整列表,包括新函数和集群设置等,请查看更详细的发布说明[11]

如果您想提前了解下个月的版本及其新功能,请访问 RisingWave GitHub repository[12]

如果您想了解 RisingWave 的所有动态,请在官网[13]订阅我们的邮件月刊,关注我们的 Twitter[14] 和 LinkedIn[15]。同时,也欢迎您加入我们的微信中文社群和 Slack[16] 英文社区,与我们的工程师还有全球各地的 RisingWave 爱好者交流!

参考资料
[1]

Cluster scaling: https://docs.risingwave.com/docs/current/k8s-cluster-scaling/

[2]

CREATE TABLE: https://docs.risingwave.com/docs/current/sql-create-table/

[3]

CREATE SOURCE: https://docs.risingwave.com/docs/current/sql-create-source/

[4]

INCLUDE clause: https://docs.risingwave.com/docs/current/include-clause/

[5]

Use UDFs in JavaScript: https://docs.risingwave.com/docs/current/udf-javascript/

[6]

Use UDFs in Rust: https://docs.risingwave.com/docs/current/udf-rust/

[7]

SQL UDFs: https://docs.risingwave.com/docs/current/sql-create-function/#sql-udfs

[8]

Cryptographic functions: https://docs.risingwave.com/docs/current/sql-function-cryptographic-functions/

[9]

Ingest data from MySQL CDC: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/

[10]

Ingest data from PostgreSQL CDC: https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/

[11]

发布说明: https://docs.risingwave.com/release-notes/

[12]

RisingWave GitHub repository: https://github.com/risingwavelabs/risingwave

[13]

官网: https://www.risingwave.com/

[14]

Twitter: https://www.risingwave-labs.com/twitter?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701

[15]

LinkedIn: https://www.risingwave-labs.com/linkedin?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701

[16]

Slack: https://www.risingwave-labs.com/slack?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701


关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。



往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览


用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台
继续滑动看下一个
RisingWave中文开源社区
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存